/*
 * File      : dataqueue.c
 * This file is part of RT-Thread RTOS
 * COPYRIGHT (C) 2012, RT-Thread Development Team
 *
 * The license and distribution terms for this file may be
 * found in the file LICENSE in this distribution or at
 * http://www.rt-thread.org/license/LICENSE
 *
 * Change Logs:
 * Date           Author       Notes
 * 2012-09-30     Bernard      first version.
 */
#include <rtthread.h>
#include <rtdevice.h>
#include <rthw.h>

struct rt_data_item
{
	void* data_ptr;
	rt_size_t data_size;
};

rt_err_t rt_data_queue_init(struct rt_data_queue* queue, rt_uint16_t size, rt_uint16_t lwm,
	void (*evt_notify)(struct rt_data_queue* queue, rt_uint32_t event))
{
    RT_ASSERT(queue != RT_NULL);

    queue->evt_notify = evt_notify;

    queue->size = size;
    queue->lwm = lwm;
	queue->waiting_lwm = RT_FALSE;

    queue->get_index = 0;
    queue->put_index = 0;

    rt_list_init(&(queue->suspended_push_list));
    rt_list_init(&(queue->suspended_pop_list));

    queue->queue = (struct rt_data_item*) rt_malloc(sizeof(struct rt_data_item) * size);
    if (queue->queue == RT_NULL)
    {
        return -RT_ENOMEM;
    }

    return RT_EOK;
}
RTM_EXPORT(rt_data_queue_init);

rt_err_t rt_data_queue_push(struct rt_data_queue* queue, void* data_ptr, rt_size_t data_size, rt_int32_t timeout)
{
    rt_uint16_t mask;
    rt_ubase_t  level;
    rt_thread_t thread;
	rt_err_t    result;
	
	RT_ASSERT(queue != RT_NULL);

	result = RT_EOK;
    thread = rt_thread_self();
	mask = queue->size - 1;

    level = rt_hw_interrupt_disable();
    while (queue->put_index - queue->get_index == queue->size)
    {
		queue->waiting_lwm = RT_TRUE;

        /* queue is full */
        if (timeout == 0)
        {
        	result = -RT_ETIMEOUT;

			goto __exit;
        }
		
		/* current context checking */
		RT_DEBUG_NOT_IN_INTERRUPT;

		/* reset thread error number */
		thread->error = RT_EOK;
		
		/* suspend thread on the push list */
		rt_thread_suspend(thread);
		rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
		/* start timer */
		if (timeout > 0)
		{
			/* reset the timeout of thread timer and start it */
			rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout);
			rt_timer_start(&(thread->thread_timer));
		}

		/* enable interrupt */
		rt_hw_interrupt_enable(level);

		/* do schedule */
		rt_schedule();
		
		/* thread is waked up */
		result = thread->error;
		level = rt_hw_interrupt_disable();
		if (result != RT_EOK) goto __exit;
    }

	queue->queue[queue->put_index & mask].data_ptr  = data_ptr;
	queue->queue[queue->put_index & mask].data_size = data_size;
	queue->put_index += 1;

	if (!rt_list_isempty(&(queue->suspended_pop_list)))
	{
		/* there is at least one thread in suspended list */

		/* get thread entry */
		thread = rt_list_entry(queue->suspended_pop_list.next, struct rt_thread, tlist);
		
		/* resume it */
		rt_thread_resume(thread);
		rt_hw_interrupt_enable(level);

		/* perform a schedule */
		rt_schedule();

		return result;
	}

__exit:
    rt_hw_interrupt_enable(level);
	if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
	{
		queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
	}

    return result;
}
RTM_EXPORT(rt_data_queue_push);

rt_err_t rt_data_queue_pop(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size, 
	rt_int32_t timeout)
{
    rt_ubase_t  level;
    rt_thread_t thread;
	rt_err_t    result;
	rt_uint16_t mask;
	
	RT_ASSERT(queue != RT_NULL);
	RT_ASSERT(data_ptr != RT_NULL);
	RT_ASSERT(size != RT_NULL);

	result = RT_EOK;
    thread = rt_thread_self();
	mask = queue->size - 1;

    level = rt_hw_interrupt_disable();
	while (queue->get_index == queue->put_index)
	{
		/* queue is empty */
		if (timeout == 0)
		{
			result = -RT_ETIMEOUT;
			goto __exit;
		}

		/* current context checking */
		RT_DEBUG_NOT_IN_INTERRUPT;

		/* reset thread error number */
		thread->error = RT_EOK;
		
		/* suspend thread on the pop list */
		rt_thread_suspend(thread);
		rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
		/* start timer */
		if (timeout > 0)
		{
			/* reset the timeout of thread timer and start it */
			rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout);
			rt_timer_start(&(thread->thread_timer));
		}

		/* enable interrupt */
		rt_hw_interrupt_enable(level);

		/* do schedule */
		rt_schedule();

		/* thread is waked up */
		result = thread->error;
		level = rt_hw_interrupt_disable();
		if (result != RT_EOK) goto __exit;
	}

	*data_ptr = queue->queue[queue->get_index & mask].data_ptr;
	*size     = queue->queue[queue->get_index & mask].data_size;

	queue->get_index += 1;

	if ((queue->waiting_lwm == RT_TRUE) && 
		(queue->put_index - queue->get_index) <= queue->lwm)
	{
		queue->waiting_lwm = RT_FALSE;

		/* there is at least one thread in suspended list and less than low water mark */
		if (!rt_list_isempty(&(queue->suspended_push_list)))
		{
			/* get thread entry */
			thread = rt_list_entry(queue->suspended_push_list.next, struct rt_thread, tlist);

			/* resume it */
			rt_thread_resume(thread);
			rt_hw_interrupt_enable(level);

			/* perform a schedule */
			rt_schedule();
		}

		if (queue->evt_notify != RT_NULL)
			queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);

		return result;
	}

__exit:
    rt_hw_interrupt_enable(level);
	if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
	{
		queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
	}

	return result;
}
RTM_EXPORT(rt_data_queue_pop);

rt_err_t rt_data_queue_peak(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size)
{
    rt_ubase_t  level;
	rt_uint16_t mask;

	RT_ASSERT(queue != RT_NULL);

	mask = queue->size - 1;

    level = rt_hw_interrupt_disable();

	if (queue->get_index == queue->put_index) 
	{
	    rt_hw_interrupt_enable(level);
		return -RT_EEMPTY;
	}
	
	*data_ptr = queue->queue[queue->get_index & mask].data_ptr;
	*size	  = queue->queue[queue->get_index & mask].data_size;

    rt_hw_interrupt_enable(level);
	return RT_EOK;
}
RTM_EXPORT(rt_data_queue_peak);

void rt_data_queue_reset(struct rt_data_queue* queue)
{
	struct rt_thread *thread;
	register rt_ubase_t temp;

	rt_enter_critical();
	/* wakeup all suspend threads */

	/* resume on pop list */
	while (!rt_list_isempty(&(queue->suspended_pop_list)))
	{
		/* disable interrupt */
		temp = rt_hw_interrupt_disable();

		/* get next suspend thread */
		thread = rt_list_entry(queue->suspended_pop_list.next, struct rt_thread, tlist);
		/* set error code to RT_ERROR */
		thread->error = -RT_ERROR;

		/*
		 * resume thread
		 * In rt_thread_resume function, it will remove current thread from
		 * suspend list
		 */
		rt_thread_resume(thread);

		/* enable interrupt */
		rt_hw_interrupt_enable(temp);
	}

	/* resume on push list */
	while (!rt_list_isempty(&(queue->suspended_push_list)))
	{
		/* disable interrupt */
		temp = rt_hw_interrupt_disable();

		/* get next suspend thread */
		thread = rt_list_entry(queue->suspended_push_list.next, struct rt_thread, tlist);
		/* set error code to RT_ERROR */
		thread->error = -RT_ERROR;

		/*
		 * resume thread
		 * In rt_thread_resume function, it will remove current thread from
		 * suspend list
		 */
		rt_thread_resume(thread);

		/* enable interrupt */
		rt_hw_interrupt_enable(temp);
	}
	rt_exit_critical();

	rt_schedule();
}
RTM_EXPORT(rt_data_queue_reset);

